package com.siyoumi.app.rabbitmq;

import com.rabbitmq.client.*;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.Connection;

//延迟队列 发送消息
@Slf4j
public class RabbitMqConsumerHandle {
    @Getter
    private Channel channel;
    private Connection connection;
    @Setter
    private Boolean autoClose = true; //true：通道自动断开;

    static public RabbitMqConsumerHandle getIns() {
        return getIns(null, true);
    }

    static public RabbitMqConsumerHandle getIns(Boolean autoClose) {
        return getIns(null, autoClose);
    }

    static public RabbitMqConsumerHandle getIns(RabbitMqProperties config, Boolean autoClose) {
        RabbitMqConsumerHandle handle = new RabbitMqConsumerHandle();
        handle.setAutoClose(autoClose);

        handle.init(config);

        return handle;
    }

    @SneakyThrows
    protected void init(RabbitMqProperties config) {
        CachingConnectionFactory factory = null;
        if (config == null) {
            factory = RabbitMqConfig.getConnectionFactory();
        } else {
            factory = new CachingConnectionFactory();
            factory.setAddresses(config.getHost());
            //factory.setHost(config.getHost());
            factory.setPort(config.getPort());
            factory.setUsername(config.getUsername());
            factory.setPassword(config.getPassword());
            factory.setVirtualHost(config.getVhost());
        }

        connection = factory.createConnection();
        channel = connection.createChannel(autoClose);
    }

    /**
     * 消费消息
     */
    @SneakyThrows
    public void consumeMessage(RabbitMqConsumer consumer, String queueName) {
        /* 设置限流机制
          param1: prefetchCount，一次性给消费者推送大于N个消息
          param2：global，是否将上面的设置应用于整个通道，false表示只应用于当前消费者
         */
        getChannel().basicQos(1, false);

        consumer.init(getChannel());

        //消费队列
        //队列名称,手动应答,处理方法
        getChannel().basicConsume(queueName, false, consumer);
    }

    public void close() throws Exception {
        if (channel != null) {
            channel.close();
        }
        if (connection != null) {
            connection.close();
        }
    }
}